本文へスキップ
バージョン: 5.0

RocketMQ Connect 実践編 2

PostgreSQL ソース (CDC) -> RocketMQ Connect -> MySQL シンク (JDBC)

準備

RocketMQの起動

  1. Linux/Unix/Mac
  2. 64bit JDK 1.8以上;
  3. Maven 3.2.x以上;
  4. RocketMQ を起動します。

ヒント: ${ROCKETMQ_HOME} の場所に関する指示

bin-release.zip バージョン:/rocketmq-all-4.9.4-bin-release

source-release.zip バージョン:/rocketmq-all-4.9.4-source-release/distribution

Connectの起動

コネクタプラグインのコンパイル

Debezium RocketMQ コネクタ

$ cd rocketmq-connect/connectors/rocketmq-connect-debezium/
$ mvn clean package -Dmaven.test.skip=true

コンパイル済みのDebezium PostgreSQL RocketMQコネクタパッケージをランタイムのロードディレクトリに移動します。コマンドは以下のとおりです:

mkdir -p /usr/local/connector-plugins
cp rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins

JDBCコネクタ

コンパイル済みのJDBCコネクタパッケージをランタイムのロードディレクトリに移動します。コマンドは以下のとおりです:

$ cd rocketmq-connect/connectors/rocketmq-connect-jdbc/
$ mvn clean package -Dmaven.test.skip=true
cp rocketmq-connect-jdbc/target/rocketmq-connect-jdbc-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins

Connectランタイムの起動

cd  rocketmq-connect

mvn -Prelease-connect -DskipTests clean install -U

設定ファイル`connect-standalone.conf`を変更します。主な設定は以下のとおりです。

$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
$ vim conf/connect-standalone.conf
$ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
$ vim conf/connect-standalone.conf
workerId=standalone-worker
storePathRootDir=/tmp/storeRoot

## Http port for user to access REST API
httpPort=8082

# Rocketmq namesrvAddr
namesrvAddr=localhost:9876

# RocketMQ acl
aclEnable=false
accessKey=rocketmq
secretKey=12345678

autoCreateGroupEnable=false
clusterName="DefaultCluster"

# Core configuration, configure the plugin directory of the previously compiled debezium package here
# Source or sink connector jar file dir,The default value is rocketmq-connect-sample
pluginPaths=/usr/local/connector-plugins
cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT

sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

Postgres イメージ

debeziumのPostgres Docker環境を使用して、Postgresデータベースをセットアップします。

# starting a pg instance
docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=start_data_engineer -e POSTGRES_PASSWORD=password debezium/postgres:14

# bash into postgres instance
docker exec -ti postgres /bin/bash

Postgres情報 ポート:5432 アカウント:start_data_engineer/password 同期元データベース:bank.holding ターゲットデータベーステーブル:bank1.holding

MySQL イメージ

debeziumのMySQL Docker環境を使用して、MySQLデータベースをセットアップします。

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9

MySQL情報

ポート:3306

アカウント:root/debezium

テストデータ

start_data_engineer/passwordアカウントでデータベースにログインします。

ソースデータベーステーブル:bank.holding

CREATE SCHEMA bank;
SET search_path TO bank,public;
CREATE TABLE bank.holding (
holding_id int,
user_id int,
holding_stock varchar(8),
holding_quantity int,
datetime_created timestamp,
datetime_updated timestamp,
primary key(holding_id)
);
ALTER TABLE bank.holding replica identity FULL;
insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now());
\q
insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now());
insert into bank.holding values (1001, 2, 'SP500', 1, now(), now());
insert into bank.holding values (1003, 3, 'SP500', 1, now(), now());
update bank.holding set holding_quantity = 300 where holding_id=1000;

ターゲットデータベーステーブル:bank1.holding

create database bank1;
CREATE TABLE holding (
holding_id int,
user_id int,
holding_stock varchar(8),
holding_quantity int,
datetime_created bigint,
datetime_updated bigint,
primary key(holding_id)
);

コネクタの起動

Debeziumソースコネクタの起動

同期元テーブルデータ:bank.holding 目的:Postgresのbinlogを解析し、共通のConnectRecordオブジェクトにカプセル化して、RocketMQトピックに送信します。

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/postgres-connector -d  '{
"connector.class": "org.apache.rocketmq.connect.debezium.postgres.DebeziumPostgresConnector",
"max.task": "1",
"connect.topicname": "debezium-postgres-source-01",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"kafka.transforms.Unwrap.add.headers": "op,source.db,source.table",
"database.history.skip.unparseable.ddl": true,
"database.server.name": "bankserver1",
"database.port": 5432,
"database.hostname": "database ip",
"database.connectionTimeZone": "UTC",
"database.user": "start_data_engineer",
"database.dbname": "start_data_engineer",
"database.password": "password",
"table.whitelist": "bank.holding",
"key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'

JDBCシンクコネクタの起動

目的:トピックからデータを読み取り、JDBCプロトコルを使用してターゲットテーブルに書き込みます。

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbcmysqlsinktest201 -d '{
"connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector",
"max.task": "2",
"connect.topicnames": "debezium-postgres-source-01",
"connection.url": "jdbc:mysql://database ip:3306/bank1",
"connection.user": "root",
"connection.password": "debezium",
"pk.fields": "holding_id",
"table.name.from.header": "true",
"pk.mode": "record_key",
"insert.mode": "UPSERT",
"db.timezone": "UTC",
"table.types": "TABLE",
"errors.deadletterqueue.topic.name": "dlq-topic",
"errors.log.enable": "true",
"errors.tolerance": "ALL",
"delete.enabled": "true",
"key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'

上記2つのコネクタタスクの作成後、start_data_engineer/passwordアカウントを使用してデータベースにログインします。

ソースデータベーステーブル`bankholding`に対する追加、削除、変更は、ターゲットテーブル`bank1.holding`に同期されます。